from pyspark import SparkConf, SparkContext
import os

os.environ['PYSPARK_PYTHON'] = r"D://Python/Python3107/python.exe"
conf = SparkConf().setMaster("local[*]").setAppName("test_spark")
sc = SparkContext(conf=conf)

file_rdd = sc.textFile(r"D://itheima/Python/pyspark案例/search_log.txt")
# TODO 需求1: 热门搜索时间TOP3(小时精度)
result1 = file_rdd.map(lambda x: x.split("\t")).\
    map(lambda x: x[0][:2]).\
    map(lambda x: (x, 1)).\
    reduceByKey(lambda a, b: a + b).\
    sortBy(lambda x: x[1], False, 1).\
    take(3)
print("需求1的结果" , result1)

# TODO 需求2: 热门搜索词TOP3
result2 = file_rdd.map(lambda x: x.split("\t")).\
    map(lambda x : (x[2], 1)).\
    reduceByKey(lambda a, b: a + b).\
    sortBy(lambda x: x[1], False, 1).\
    take(3)
print("需求2的结果", result2)

# TODO 需求3: 统计黑马程序员这个关键词在什么时段搜索的最多
result3 = file_rdd.map(lambda x: x.split("\t")).\
    filter(lambda x: x[2] == "黑马程序员").\
    map(lambda x: x[0][:2]).\
    map(lambda x: (x, 1)).\
    reduceByKey(lambda a, b: a + b).\
    sortBy(lambda x: x[1], False, 1).\
    take(1)
print("需求3的结果", result3)

# TODO 需求4: 将数据转为JSON格式，输出到文件中
file_rdd.map(lambda x: x.split("\t")).\
    map(lambda x : {"time": x[0], "user_id": x[1], "key_word": x[2], "rank1": x[3], "rank2": x[4], "url": x[5]}).\
    saveAsTextFile(r"D://itheima/Python/pyspark案例/output_json")
